package com.atguigu.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/6/19 9:26
 */
public class Flink06_Table_Time_Processing_DDL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        tEnv.executeSql("create table sensor(" +
                            "   id string, " +
                            "   ts bigint," +
                            "   vc int, " +
                            "   pt as proctime()" +  //定义处理时间属性
                            ")with(" +
                            "   'connector' = 'filesystem'," +
                            "   'path' = 'input/sensor.txt', " +
                            "   'format' = 'csv'" +
                            ")");
    
        tEnv.sqlQuery("select * from sensor").execute().print();
    
    }
}
